/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
*/

package org.apache.griffin.measure.datasource.connector.batch

import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader}
import java.net.URI
import java.util

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.http.client.methods.CloseableHttpResponse
import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{StructField, StructType}

import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._


case class ElasticSearchGriffinDataConnector(@transient sparkSession: SparkSession,
                                             dcParam: DataConnectorParam,
                                             timestampStorage: TimestampStorage) extends BatchDataConnector {

  lazy val getBaseUrl = s"http://$host:$port"
  val config: scala.Predef.Map[scala.Predef.String, scala.Any] = dcParam.getConfig
  val Index = "index"
  val Type = "type"
  val Host = "host"
  val Port = "port"
  val EsVersion = "version"
  val Fields = "fields"
  val Size = "size"
  val MetricName = "metric.name"
  val index: String = config.getString(Index, "default")
  val dataType: String = config.getString(Type, "accuracy")
  val metricName: String = config.getString(MetricName, "*")
  val host: String = config.getString(Host, "")
  val version: String = config.getString(EsVersion, "")
  val port: String = config.getString(Port, "")
  val fields: Seq[String] = config.getStringArr(Fields, Seq[String]())
  val size: Int = config.getInt(Size, 100)

  override def data(ms: Long): (Option[DataFrame], TimeRange) = {

    val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
    info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")

    val dfOpt = try {
      val answer = httpRequest(path)
      val data = ArrayBuffer[Map[String, Number]]()

      if (answer._1) {
        val arrayAnswers: util.Iterator[JsonNode] = parseString(answer._2).get("hits").get("hits").elements()

        while (arrayAnswers.hasNext) {
          val answer = arrayAnswers.next()
          val values = answer.get("_source").get("value")
          val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
          val fieldsMap = mutable.Map[String, Number]()
          while (fields.hasNext) {
            val fld: util.Map.Entry[String, JsonNode] = fields.next()
            fieldsMap.put(fld.getKey, fld.getValue.numberValue())
          }
          data += fieldsMap.toMap
        }
      }
      val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
      val columns: Array[String] = fields.toArray
      val defaultNumber: Number = 0.0
      val rdd: RDD[Row] = rdd1
        .map { x: Map[String, Number] =>
          Row(
            columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
        }
      val schema = dfSchema(columns.toList)
      val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
      df.show(20)
      val dfOpt = Some(df)
      val preDfOpt = preProcess(dfOpt, ms)
      preDfOpt
    } catch {
      case e: Throwable =>
        error(s"load ES table $host:$port $index/$dataType  fails: ${e.getMessage}", e)
        None
    }
    val tmsts = readTmst(ms)
    (dfOpt, TimeRange(ms, tmsts))
  }

  def httpRequest(path: String): (Boolean, String) = {

    val url: String = s"$getBaseUrl$path"
    info(s"url:$url")
    val uri: URI = new URI(url)
    val request = new org.apache.http.client.methods.HttpGet(uri)
    request.addHeader("Content-Type", "application/json")
    request.addHeader("Charset", "UTF-8")
    val client = HttpClientBuilder.create().build()
    val response: CloseableHttpResponse = client.execute(request)
    val handler = new BasicResponseHandler()
    (true, handler.handleResponse(response).trim)

  }

  def parseString(data: String): JsonNode = {
    val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    val reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data.getBytes)))
    mapper.readTree(reader)
  }

  def dfSchema(columnNames: List[String]): StructType = {
    val a: Seq[StructField] = columnNames
      .map(x => StructField(name = x,
        dataType = org.apache.spark.sql.types.DoubleType,
        nullable = false))
    StructType(a)
  }

}